Fivetran 2024年2月新機能:Fivetranからdbt Cloudのジョブをトリガーで実行できるようになりました #Fivetran
こんにちは、スズです。
2024年2月に、Fivetranとdbt Cloudの連携機能が発表されました。
この新機能により、FivetranのTransformationからdbt Cloudと連携し、dbt CloudのジョブをFivetranからトリガーで実行できるようになりました。早速試してみましたので、本記事でご紹介いたします。
※本機能は2024年2月時点でベータ版となっております。
事前準備
Fivetranの設定
Destination
データの同期先となるDestinationとして今回はSnowflakeを使用します。SnowflakeをDestinationに設定する方法は、以下のブログにてご紹介しております。
Connector
データの同期元はGoogleスプレッドシートを使用します。ConnectorにGoogleスプレッドシートを設定する方法は、以下のブログにてご紹介しております。
今回は使用するデータをそれぞれGoogleスプレッドシートにアップロードし、Connectorで接続しています。
使用するデータ
今回使用するデータは、dbt公式のドキュメント『Quickstart for dbt Cloud and Snowflake』にあるデータを使用しています。
GoogleスプレッドシートからSnowflakeにロードしたデータは、FIVETRAN_DATABASE
データベースの配下にあります。JAFFLE_SHOP
スキーマにあるCUSTOMERS
とORDERS
のテーブルが、今回dbt Cloudを使ったデータの変換に使用する元のデータとなります。
CUSTOMERS
テーブルのデータは以下のようになっています。
ORDERS
テーブルのデータは以下のようになっています。
dbt Cloudの設定
プロジェクトの作成
dbt Cloudでプロジェクトを作成し、データ変換とジョブを定義します。プロジェクトは、上記の「使用するデータ」と同様に、dbt公式のドキュメント『Quickstart for dbt Cloud and Snowflake』の内容を使用しています。今回使用するdbt Cloudのプロジェクトでは、CUSTOMERS
テーブルからSTG_CUSTOMERS
テーブル、ORDERS
テーブルからSTG_ORDERS
テーブルを作成し、さらにSTG_CUSTOMERS
テーブルとSTG_ORDERS
テーブルからCUSTOMERS
テーブルを新しく作成する、ということを行っています。
motels/stg_customers.sql
select id as customer_id, first_name, last_name from {{ source('jaffle_shop', 'customers') }}
motels/stg_orders.sql
select id as order_id, user_id as customer_id, order_date, status from {{ source('jaffle_shop', 'orders') }}
motels/customers.sql
with customers as ( select * from {{ ref('stg_customers') }} ), orders as ( select * from {{ ref('stg_orders') }} ), customer_orders as ( select customer_id, min(order_date) as first_order_date, max(order_date) as most_recent_order_date, count(order_id) as number_of_orders from orders group by 1 ), final as ( select customers.customer_id, customers.first_name, customers.last_name, customer_orders.first_order_date, customer_orders.most_recent_order_date, coalesce(customer_orders.number_of_orders, 0) as number_of_orders from customers left join customer_orders using (customer_id) ) select * from final
その他の設定として、dbt Cloudで変換したデータの出力先は、DBT_FIVETRAN_TEST
という名前のデータベースを指定しています。また、dbt build
コマンドを実行するジョブを作成しています。Fivetranからトリガーでこのジョブを実行することを想定しています。
Service Tokenの取得
Fivetranとの連携のために、dbt CloudのService Tokenを使用します。Service Tokenを取得するには管理者権限が必要となります。詳細はdbt Cloudのドキュメントをご参照ください。
なお、今回はdbt CloudのProfile settings(Account settings > Perfonal profile)にてAPI Keyを取得して使用しています。
Transformationの設定
ここからFivetranのTransformationでdbt Cloudと連携してみます。Fivetranの左ペインの[Transformations]から、Transformationを設定する対象のDestinationを選択します。
Transformationを何で行うかを選択します。今回はdbt Cloud projectを選択します。
Fivetranからdbt Cloudにアクセスするための設定を行います。
- Region: dbt Cloudのリージョン
URLがhttps://cloud.getdbt.com
の場合はUS region
、https://emea.dbt.com
の場合はEurope region
- Service Token: dbt Cloudのサービストークン
- Account: dbt Cloudのアカウントを選択
- Project: dbt Cloudのプロジェクトを選択
[Service Token]を入力した後、[Authenticate]をクリックし認証に成功すると、[Account]が表示されドロップダウンで選択できるようになります。選択した[Account]にあわせて、[Project]を選択できるようになります。[Project]を選択後、[Check project]をクリックしてチェックしたのち、[Save & Test]をクリックして先に進みます。
テストに成功しました。[Transformations]をクリックします。
ここまででFivetranとdbt Cloudの接続設定が完了しました。次に、[Add Transformation] > [dbt Cloud]をクリックしてトリガーで実行するジョブを設定していきます。
Select jobにあるプルダウンメニューから対象のジョブを選択します。
ジョブの実行方法として[Integrated]または[Custom]のどちらかを選びます。[Custom]の場合、ジョブを実行する曜日や頻度、実行時刻を設定できます。
今回は[Integrated]を選択します。Integratedの場合、Integratedに設定したConnectorの同期にあわせてジョブが実行されます。今回は「jaffle_shop.customers」というConnectorを設定しています。設定後、[Save & Run]をクリックします。
Transformationsの画面に戻り、先ほど設定したTransformationが追加されていることが確認できます。この時点ではジョブは実行されておらず、StatusがPendingになっています。
Connectorを同期してみる
Transformationの設定が完了しましたので、Connectorを同期し、ジョブが実行されるかを確認します。Transformationに設定したConnectorを表示し、[SYNC NOW]をクリックして同期を実行します。
しばらくしてTransformationを確認すると、StatusがSucceededと表示されています。
dbt Cloudでもジョブが実行されていることが確認できました。
Snowflakeを確認すると、FIVETRAN_DBT_TRANSFORMATION
スキーマの配下にCUSTOMERS
、STG_CUSTOMERS
、STG_ORDERS
のテーブルが作成され、dbtで変換したデータが出力されていました。
最後に
Fivetranとdbt Cloudの連携機能をご紹介しました。この新機能により、データのロードからdbt Cloudでのデータの変換までのスケジュールをFivetranで調整できるようになりました。1つのプラットフォーム上でスケジュール管理でき、便利な機能ではないでしょうか。